package com.cupdata.zicon.jdk7concurrent.chapter3;

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * 
 * The CyclicBarrier class is initialized with an integer number, which is the
 * number of threads that will be synchronized in a determined point. When one
 * of those threads arrives to the determined point, it calls the await() method
 * to wait for the other threads. When the thread calls that method, the
 * CyclicBarrier class blocks the thread that is sleeping until the other
 * threads arrive. When the last thread calls the await() method of the
 * CyclicBarrier class, it wakes up all the threads that were waiting and
 * continues with its job.
 * 
 * One interesting advantage of the CyclicBarrier class is that you can pass an
 * additional Runnable object as an initialization parameter, and the
 * CyclicBarrier class executes this object as a thread when all the threads
 * have arrived to the common point. This characteristic makes this class
 * adequate for the parallelization of tasks using the divide and conquer
 * programming technique.
 * 
 * CyclicBarrier class also provides the getNumberWaiting() method that returns the number of
threads that are blocked in the await() method, and the getParties() method that
returns the number of tasks that are going to be synchronized with CyclicBarrier.

Resetting a CyclicBarrier object
The CyclicBarrier class has some points in common with the CountDownLatch
class, but they also have some differences. One of the most important differences is that a
CyclicBarrier object can be reset to its initial state, assigning to its internal counter the
value with which it was initialized.
This reset operation can be done using the reset() method of the CyclicBarrier
class. When this occurs, all the threads that were waiting in the await() method receive
a BrokenBarrierException exception. This exception was processed in the example
presented in this recipe by printing the stack trace, but in a more complex application, it could
perform some other operation, such as restarting their execution or recovering their operation
at the point it was interrupted.

Broken CyclicBarrier objects
A CyclicBarrier object can be in a special state denoted by broken. When there are
various threads waiting in the await() method and one of them is interrupted, this thread
receives an InterruptedException exception, but the other threads that were waiting
receive a BrokenBarrierException exception and CyclicBarrier is placed in the
broken state.
The CyclicBarrier class provides the isBroken() method, then returns true if the
object is in the broken state; otherwise it returns false.

 * 
 * @author SunYabing
 *
 */
public class CyclicBarrierTest {
	public static void main(String[] args) {
		final int ROWS = 10000;
		final int NUMBERS = 1000;
		final int SEARCH = 5;
		final int PARTICIPANTS = 5;
		final int LINES_PARTICIPANT = 2000;

		MatrixMock mock = new MatrixMock(ROWS, NUMBERS, SEARCH);
		Results results = new Results(ROWS);
		Grouper grouper = new Grouper(results);
		CyclicBarrier barrier = new CyclicBarrier(PARTICIPANTS, grouper);

		Searcher searchers[] = new Searcher[PARTICIPANTS];
		for (int i = 0; i < PARTICIPANTS; i++) {
			searchers[i] = new Searcher(i * LINES_PARTICIPANT,
					(i * LINES_PARTICIPANT) + LINES_PARTICIPANT, mock, results,
					5, barrier);
			Thread thread = new Thread(searchers[i]);
			thread.start();
		}
		System.out.printf("Main: The main thread has finished.\n");
	}
}

class MatrixMock {
	private int data[][];

	public MatrixMock(int size, int length, int number) {
		int counter = 0;
		data = new int[size][length];
		Random random = new Random();

		for (int i = 0; i < size; i++) {
			for (int j = 0; j < length; j++) {
				data[i][j] = random.nextInt(10);
				if (data[i][j] == number) {
					counter++;
				}
			}
		}

		System.out.printf(
				"Mock: There are %d ocurrences of %d in generated data.\n",
				counter, number);

	}

	public int[] getRow(int row) {
		if ((row >= 0) && (row < data.length)) {
			return data[row];
		}
		return null;
	}
}

class Results {
	private int data[];

	public Results(int size) {
		data = new int[size];
	}

	public void setData(int position, int value) {
		data[position] = value;
	}

	public int[] getData() {
		return data;
	}
}

class Searcher implements Runnable {
	private int firstRow;
	private int lastRow;
	private MatrixMock mock;
	private Results results;
	private int number;
	private final CyclicBarrier barrier;

	public Searcher(int firstRow, int lastRow, MatrixMock mock,
			Results results, int number, CyclicBarrier barrier) {
		this.firstRow = firstRow;
		this.lastRow = lastRow;
		this.mock = mock;
		this.results = results;
		this.number = number;
		this.barrier = barrier;
	}

	@Override
	public void run() {
		int counter;
		System.out.printf("%s: Processing lines from %d to %d.\n", Thread
				.currentThread().getName(), firstRow, lastRow);
		for (int i = firstRow; i < lastRow; i++) {
			int row[] = mock.getRow(i);
			counter = 0;
			for (int j = 0; j < row.length; j++) {
				if (row[j] == number) {
					counter++;
				}
			}
			results.setData(i, counter);//划分成5个小矩阵,起始和结束 lines 不同，所以 保存结果的一维数组坐标不会重复
		}

		System.out.printf("%s: Lines processed.\n", Thread.currentThread()
				.getName());
		try {
			barrier.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (BrokenBarrierException e) {
			e.printStackTrace();
		}
	}
}

class Grouper implements Runnable {
	private Results results;

	public Grouper(Results results) {
		this.results = results;
	}

	@Override
	public void run() {
		int finalResult = 0;
		System.out.printf("Grouper: Processing results...\n");
		int data[] = results.getData();
		for (int number : data) {
			finalResult += number;
		}
		System.out.printf("Grouper: Total result: %d.\n", finalResult);
	}
}
